package com.ouyunc.mq.config.rabbitmq.strategy.impl;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ouyunc.mq.config.rabbitmq.enums.RabbitMqEnum;
import com.ouyunc.mq.config.rabbitmq.properties.ClusterRabbitMqProperties;
import com.ouyunc.mq.config.rabbitmq.strategy.RabbitMqStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * @Author fangzhenxun
 * @Description: 集群rabbitmq 策略配置
 * @Date 2020/2/29 17:41
 * @Version V1.0
 **/

@Slf4j
@Configuration
@ConditionalOnExpression("'${mq.rabbit.primary}'.equals('CLUSTER')")
public class ClusterRabbitMqStrategy implements RabbitMqStrategy {
    /**
     * 编码
     */
    private static  final  String ENCODE_UTF8 = "UTF-8";

    /**
     * contentType
     */
    private static  final  String CONTENT_TYPE_JSON = "application/json";
    /**
     * 集群rabbitmq的属性配置文件
     */
    @Autowired
    private ClusterRabbitMqProperties clusterRabbitMqProperties;


    /**
     * rabbitmq 监听器生产工厂配置
     **/
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;



    /**
     * @Author fangzhenxun
     * @Description 标识该策略类是镜像集群
     * @date 2020/2/29 17:42
     * @param
     * @return com.xyt.mq.config.rabbitmq.enums.MqEnum
     */
    @Override
    public RabbitMqEnum getType() {
        return RabbitMqEnum.CLUSTER;
    }


    /**
     * @Author fangzhenxun
     * @Description 使用缓存连接池工厂来生产rabbitmq模板
     * 注意：这里不要使用new来创建建工厂
     * @date 2020/2/29 17:42
     * @param
     * @return org.springframework.amqp.rabbit.connection.ConnectionFactory
     */
    @Override
    public CachingConnectionFactory buildConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

        //设置ip + port
        cachingConnectionFactory.setAddresses(clusterRabbitMqProperties.getAddresses());
        //设置登录用户名称
        cachingConnectionFactory.setUsername(clusterRabbitMqProperties.getUsername());
        //设置登录用户密码
        cachingConnectionFactory.setPassword(clusterRabbitMqProperties.getPassword());
        //设置虚拟vHost
        cachingConnectionFactory.setVirtualHost(clusterRabbitMqProperties.getVirtualHost());
        //是否开启消息确认（用于producer到broker之间确保消息的可靠传递）
        cachingConnectionFactory.setPublisherConfirmType(clusterRabbitMqProperties.getConfirmType());
        //是否开启消息返回（用于exchange到queue之间存在有效的路由可以将小西投递到队列中）
        cachingConnectionFactory.setPublisherReturns(clusterRabbitMqProperties.isPublisherReturns());

        //setCacheMode：设置缓存模式，共有两种，CHANNEL和CONNECTION模式。
        //1、CONNECTION模式，这个模式下允许创建多个Connection，会缓存一定数量的Connection，每个Connection中同样会缓存一些Channel，
        // 除了可以有多个Connection，其它都跟CHANNEL模式一样。
        //2、CHANNEL模式，程序运行期间ConnectionFactory会维护着一个Connection，
        // 所有的操作都会使用这个Connection，但一个Connection中可以有多个Channel，
        // 操作rabbitmq之前都必须先获取到一个Channel，
        // 否则就会阻塞（可以通过setChannelCheckoutTimeout()设置等待时间），
        // 这些Channel会被缓存（缓存的数量可以通过setChannelCacheSize()设置）；
        //设置CONNECTION模式，可创建多个Connection连接
        //cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        //设置每个Connection中缓存Channel的数量，不是最大的。操作rabbitmq之前（send/receive message等）
        // 要先获取到一个Channel.获取Channel时会先从缓存中找闲置的Channel，如果没有则创建新的Channel，
        // 当Channel数量大于缓存数量时，多出来没法放进缓存的会被关闭。

        //cachingConnectionFactory.setChannelCacheSize(10);

        //单位：毫秒；配合channelCacheSize不仅是缓存数量，而且是最大的数量。
        // 从缓存获取不到可用的Channel时，不会创建新的Channel，会等待这个值设置的毫秒数
        //同时，在CONNECTION模式，这个值也会影响获取Connection的等待时间，
        // 超时获取不到Connection也会抛出AmqpTimeoutException异常。

        //cachingConnectionFactory.setChannelCheckoutTimeout(600);

        //仅在CONNECTION模式使用，设置Connection的缓存数量。
        //cachingConnectionFactory.setConnectionCacheSize(1);
        //setConnectionLimit：仅在CONNECTION模式使用，设置Connection的数量上限。
        //cachingConnectionFactory.setConnectionLimit(10);

        //调用后初始化方法，没有它将抛出异常
        cachingConnectionFactory.afterPropertiesSet();
        return cachingConnectionFactory;
    }

    /**
     * @Author fangzhenxun
     * @Description rabbitmq 监听器，主要是用于“并发量的配置”，表示：并发消费者的初始化值，并发消费者的最大值，每个消费者每次监听时可拉取处理的消息数量。
     * 适应于多个消费者
     * @date 2020/3/1 16:02
     * @param
     * @return org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
     */
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory multiListenerContainer(CachingConnectionFactory buildConnectionFactory){
        SimpleRabbitListenerContainerFactory srcFactory = new SimpleRabbitListenerContainerFactory();

        //消息转换器
        srcFactory.setMessageConverter(new MessageConverter() {

            //生产端
            @Override
            public Message toMessage(Object messageContent, MessageProperties messageProperties) throws MessageConversionException {
                ObjectMapper objectMapper = new ObjectMapper();
                byte[] bytes;
                try {
                    String msgStr = objectMapper.writeValueAsString(messageContent);
                    bytes = msgStr.getBytes(ENCODE_UTF8);
                } catch (IOException e) {
                    log.error("消息转换失败! + {}" , e.getMessage());
                    throw new MessageConversionException("消息转换失败！", e);
                }
                //设置消息属性
                messageProperties.setContentType(CONTENT_TYPE_JSON);
                messageProperties.setContentEncoding(ENCODE_UTF8);
                messageProperties.setContentLength(bytes.length);

                return new Message(bytes, messageProperties);
            }

            //消费端
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                //设置contentType
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setContentEncoding(ENCODE_UTF8);
                return JSON.toJSONString(message.getBody());
            }
        });
        //采用的应答模式，此处使用手动应答
        srcFactory.setAcknowledgeMode(clusterRabbitMqProperties.getListener().getSimple().getAcknowledgeMode());
        //并发消费者初始化数量
        srcFactory.setConcurrentConsumers(clusterRabbitMqProperties.getListener().getSimple().getConcurrency());
        //并发消费者的最大值。
        srcFactory.setMaxConcurrentConsumers(clusterRabbitMqProperties.getListener().getSimple().getMaxConcurrency());
        //每个消费者每次监听时可拉取处理的消息数量
        srcFactory.setPrefetchCount(clusterRabbitMqProperties.getListener().getSimple().getPrefetch());
        //配置rabbitmq 的监听器和缓存连接工厂
        listenerContainerFactoryConfigurer.configure(srcFactory, buildConnectionFactory);
        return srcFactory;
    }

}
